{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We start by downloading a specific release of the components because running from master is not a good way to buid \"repetable\" systems"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!tar -xvf 0.2.5.tar.gz"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::loadGCSDLComponent[]\n",
    "gcs_download_component = kfp.components.load_component_from_file(\n",
    "    \"pipelines-0.2.5/components/google-cloud/storage/download/component.yaml\")\n",
    "#end::loadGCSDLComponent[]\n",
    "#tag::loadTFDVAndFriendsComponents[]\n",
    "tfx_csv_gen = kfp.components.load_component_from_file(\n",
    "    \"pipelines-0.2.5/components/tfx/ExampleGen/CsvExampleGen/component.yaml\")\n",
    "tfx_statistic_gen = kfp.components.load_component_from_file(\n",
    "    \"pipelines-0.2.5/components/tfx/StatisticsGen/component.yaml\")\n",
    "tfx_schema_gen = kfp.components.load_component_from_file(\n",
    "    \"pipelines-0.2.5/components/tfx/SchemaGen/component.yaml\")\n",
    "tfx_example_validator = kfp.components.load_component_from_file(\n",
    "    \"pipelines-0.2.5/components/tfx/ExampleValidator/component.yaml\")\n",
    "#end::loadTFDVAndFriendsComponents[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@kfp.dsl.pipeline(\n",
    "  name='DL',\n",
    "  description='Sample DL pipeline'\n",
    ")\n",
    "def pipeline_with_dl():\n",
    "    #tag::dlOp[]\n",
    "    dl_op = gcs_download_component(\n",
    "        gcs_path=\"gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv\") # Your path goes here\n",
    "    #end::dlOp[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "kfp.compiler.Compiler().compile(pipeline_with_dl, 'dl_pipeline.zip')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = kfp.Client()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "my_experiment = client.create_experiment(name='dl')\n",
    "my_run = client.run_pipeline(my_experiment.id, 'dl', \n",
    "  'dl_pipeline.zip')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::standaloneTFDVPipeline[]\n",
    "@kfp.dsl.pipeline(\n",
    "  name='TFDV',\n",
    "  description='TF DV Pipeline'\n",
    ")\n",
    "def tfdv_pipeline():\n",
    "    # DL with wget, can use gcs instead as well\n",
    "    data_url = \"https://raw.githubusercontent.com/moorissa/medium/master/items-recommender/data/trx_data.csv\"\n",
    "    #tag::wget[]\n",
    "    fetch = kfp.dsl.ContainerOp(\n",
    "      name='download',\n",
    "      image='busybox',\n",
    "      command=['sh', '-c'],\n",
    "      arguments=[\n",
    "          'sleep 1;'\n",
    "          'mkdir -p /tmp/data;'\n",
    "          'wget '+ data_url +' -O /tmp/data/results.csv'],\n",
    "      file_outputs={'downloaded': '/tmp/data'})\n",
    "    # This expects a directory of inputs not just a single file\n",
    "    #end::wget[]\n",
    "    #tag::csv[]\n",
    "    records_example = tfx_csv_gen(input_base=fetch.output)\n",
    "    #end::csv[]\n",
    "    #tag::stats[]\n",
    "    stats = tfx_statistic_gen(input_data=records_example.output)\n",
    "    #end::stats[]\n",
    "    #tag::schema[]\n",
    "    schema_op = tfx_schema_gen(stats.output)\n",
    "    #end::schema[]\n",
    "    #tag::validate[]\n",
    "    tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])\n",
    "    #end::validate[]\n",
    "#end::standaloneTFDVPipeline[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "kfp.compiler.Compiler().compile(tfdv_pipeline, 'tfdv_pipeline.zip')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "my_experiment = client.create_experiment(name='tfdv_pipeline')\n",
    "my_run = client.run_pipeline(my_experiment.id, 'tfdv', \n",
    "  'tfdv_pipeline.zip')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip3 install tfx tensorflow-data-validation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::importTFDV[]\n",
    "import tensorflow_data_validation as tfdv\n",
    "#end::importTFDV[]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can download your schema by looking at the inputs/outputs in your pipeline run for the schema gen stage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::displaySchema{}\n",
    "schema = tfdv.load_schema_text(\"schema_info_2\")\n",
    "tfdv.display_schema(schema)\n",
    "#end::displaySchema[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::loadTFT[]\n",
    "tfx_transform = kfp.components.load_component_from_file(\n",
    "    \"pipelines-0.2.5/components/tfx/Transform/component.yaml\")\n",
    "#end::loadTFT[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "module_file=\"gcs://\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@kfp.dsl.pipeline(\n",
    "  name='TFX',\n",
    "  description='TFX pipeline'\n",
    ")\n",
    "def tfx_pipeline():\n",
    "    # DL with wget, can use gcs instead as well\n",
    "    fetch = kfp.dsl.ContainerOp(\n",
    "      name='download',\n",
    "      image='busybox',\n",
    "      command=['sh', '-c'],\n",
    "      arguments=[\n",
    "          'sleep 1;'\n",
    "          'mkdir -p /tmp/data;'\n",
    "          'wget https://raw.githubusercontent.com/moorissa/medium/master/items-recommender/data/trx_data.csv -O /tmp/data/results.csv'],\n",
    "      file_outputs={'downloaded': '/tmp/data'})\n",
    "    records_example = tfx_csv_gen(input_base=fetch.output)\n",
    "    stats = tfx_statistic_gen(input_data=records_example.output)\n",
    "    schema_op = tfx_schema_gen(stats.output)\n",
    "    tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])\n",
    "    #tag::tft[]\n",
    "    transformed_output = tfx_transform(\n",
    "        input_data=records_example.output,\n",
    "        schema=schema_op.outputs['output'],\n",
    "        module_file=module_file) # Path to your TFT code on GCS/S3\n",
    "    #end::tft[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "my_experiment = client.create_experiment(name='tfx_pipeline')\n",
    "my_run = client.run_pipeline(my_experiment.id, 'tfx', \n",
    "  'tfx_pipeline.zip')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
